package com.sn.campus.nettyserver;


import com.sn.campus.config.SaveData;
import com.sn.campus.entity.TBaseSos;
import com.sn.campus.utils.MsgParse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @program: qingcheng
 * @author: XIONG CHUAN
 * @create: 2019-04-28 15:21
 * @description: netty服务端处理类
 **/


public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger log = LogManager.getLogger(getClass());

//    private ExecutorService ex = Executors.newCachedThreadPool();

//    private Object lock = new Object();

    private static volatile ConcurrentHashMap<String, String> deviceStatus = new ConcurrentHashMap<>();

    //创建一个消息队列
    public static LinkedBlockingQueue<String> reportFreqs = new LinkedBlockingQueue<String>(1000);

    /**
     * 管理一个全局map，保存连接进服务端的通道数量
     */
    private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
    //创建一个绑定deviceID和channelId的map
    private static ConcurrentHashMap<String, ChannelId> deviceMap = new ConcurrentHashMap<>();
    //创建一个设备对应扫台结果的map
    public static volatile ConcurrentHashMap<String, String> deviceScanfreqs = new ConcurrentHashMap<>();


    /**
     * @param ctx
     * @author xiongchuan on 2019/4/28 16:10
     * @DESCRIPTION: 有客户端连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {

        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();

        String clientIp = insocket.getAddress().getHostAddress();
        int clientPort = insocket.getPort();

        //获取连接通道唯一标识
        ChannelId channelId = ctx.channel().id();

        //如果map中不包含此连接，就保存连接
        if (CHANNEL_MAP.containsKey(channelId)) {
            log.info("client 【" + channelId + "】 is connected，client channel count: " + CHANNEL_MAP.size());
        } else {
            //保存连接
            CHANNEL_MAP.put(channelId, ctx);
            log.info("clent【" + channelId + "】 connecting [IP:" + clientIp + "--->PORT:" + clientPort + "]");
            log.info("client channel count: " + CHANNEL_MAP.size());
//            new Thread(new PeriodicDuty(lock,channelId)).start();
        }

    }

    /**
     * @param ctx
     * @author xiongchuan on 2019/4/28 16:10
     * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ChannelId channelId = ctx.channel().id();
        //包含此客户端才去删除
        if (CHANNEL_MAP.containsKey(channelId)) {
            //删除连接
            CHANNEL_MAP.remove(channelId);
            //删除deviceMap中失效的连接
            Set<Map.Entry<String, ChannelId>> entries = deviceMap.entrySet();
            Iterator<Map.Entry<String, ChannelId>> iterator = entries.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, ChannelId> next = iterator.next();
                if (next.getValue().equals(channelId)) {
                    deviceMap.remove(next.getKey());
                }
            }
            log.info("client【" + channelId + "】 exit nettyserver[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
            log.info("client channel count: " + CHANNEL_MAP.size());

        }
    }

    /**
     * @param ctx
     * @author xiongchuan on 2019/4/28 16:10
     * @DESCRIPTION: 有客户端发消息会触发此函数
     * @return: void
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (null == msg) {
            log.info("the msg is null at"+ctx.channel().id());
            return;
        }
        log.info("加载客户端报文......");
        log.info("【" + ctx.channel().id() + "】" + " :" + msg);
//        String[] split = msg.toString().split("@");
        String regex = "L[A-G]{1}[0-9]{8}[a-z0-9A-Z]{12}+.*";
        if (!msg.toString().matches(regex)) {
            log.info("终端数据上报异常！！！");
            return;
        }
//        log.info("【" + ctx.channel().id() + "】" + " :" + deviceMap);
        //响应客户端
//        this.channelWrite(ctx.channel().id(), msg);
        //处理消息
        dealMessage(msg.toString());
    }

    /**
     * @param msg       需要发送的消息内容
     * @param channelId 连接通道唯一id
     * @author xiongchuan on 2019/4/28 16:10
     * @DESCRIPTION: 服务端给客户端发送消息
     * @return: void
     */
    public void channelWrite(ChannelId channelId, Object msg) {
        ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
        if (ctx == null) {
            log.info("channel【" + channelId + "】 not exists");
            return;
        }
        String reMsg = "LEOK#@";
        log.info("send message by channel:" + channelId + ":" + reMsg);
        //将客户端的信息直接返回写入ctx
        ctx.write(reMsg);
        //刷新缓存区
        ctx.flush();

    }

    public void dealMessage(String msg) {

//        reportFreqs.add(msg);
        TBaseSos tBaseSos = MsgParse.parseMessage(msg);
        if (tBaseSos == null) {return;}
        switch (tBaseSos.getProtocolType()) {
            case "LDTEST":   // 心跳

                break;
            case "LDIMEI":   // 校时

                break;
            case "LDSRTC":   // 校秒

                break;
            case "LDSTAS":   // 主机状态

                break;
            case "LDPORT":   // 视频参数
                log.info("打印视频参数，保存数据库"+tBaseSos.toString());
                new Thread(new SaveData(tBaseSos)).start();
                break;
            case "LA":   // 告警
                log.info("打印告警参数，保存数据库"+tBaseSos.toString());
                new Thread(new SaveData(tBaseSos)).start();
                break;
            default:

                break;
        }

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        String socketString = ctx.channel().remoteAddress().toString();

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client: " + socketString + " READER_IDLE 读超时");
                ctx.disconnect();
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client: " + socketString + " WRITER_IDLE 写超时");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client: " + socketString + " ALL_IDLE 总超时");
                ctx.disconnect();
            }
        }
    }

    /**
     * @param ctx
     * @author xiongchuan on 2019/4/28 16:10
     * @DESCRIPTION: 发生异常会触发此函数
     * @return: void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
        //cause.printStackTrace();
    }

    public static ConcurrentHashMap<String, ChannelId> getDeviceMap() {
        return deviceMap;
    }

    public static ConcurrentHashMap<ChannelId, ChannelHandlerContext> getChannelMap() {
        return CHANNEL_MAP;
    }

    public static ConcurrentHashMap<String, String> getDeviceStatus() {
        return deviceStatus;
    }

}
